iT邦幫忙

2024 iThome 鐵人賽

DAY 24
0
AI/ ML & Data

從點子構想到部署上線:機器學習專案的一生系列 第 24

[Day 24] Metaflow - Part 3. Model Training & Cloud Resources

  • 分享至 

  • xImage
  •  

前面兩天介紹了 Metaflow 的基本功能,以及用資料處理來示範如何使用 Metaflow。不過,Metaflow 作為一個強大的 workflow 管理工具,當然也要支援模型訓練 和 使用cloud resource 管理啦。

今天就讓我們來看看這邊的內容吧!


Model Training

除了資料處理以外,Metaflow 也可以用來訓練模型,以下是一個範例程式碼:

from metaflow import FlowSpec, step, Parameter

class GradientBoostedTreesFlow(FlowSpec):

    random_state = Parameter("seed", default=12)
    n_estimators = Parameter("n-est", default=10)
    eval_metric = Parameter("eval-metric", default='mlogloss')
    k_fold = Parameter("k", default=5)
    
    @step
    def start(self):
        from sklearn import datasets
        self.iris = datasets.load_iris()
        self.X = self.iris['data']
        self.y = self.iris['target']
        self.next(self.train_xgb)

    @step
    def train_xgb(self):
        from xgboost import XGBClassifier
        from sklearn.model_selection import cross_val_score
        self.clf = XGBClassifier(
            n_estimators=self.n_estimators,
            random_state=self.random_state,
            eval_metric=self.eval_metric,
            use_label_encoder=False)
        self.scores = cross_val_score(
            self.clf, self.X, self.y, cv=self.k_fold)
        self.next(self.end)
        
    @step
    def end(self):
        import numpy as np
        msg = "Gradient Boosted Trees Model Accuracy: {} \u00B1 {}%"
        self.mean = round(100*np.mean(self.scores), 3)
        self.std = round(100*np.std(self.scores), 3)
        print(msg.format(self.mean, self.std))

if __name__ == "__main__":
    GradientBoostedTreesFlow()

程式碼來源:https://docs.outerbounds.com/intro-tutorial-S2E2/

結合 Ray Data 和 Ray Train

Metaflow 當然也可以結合我們前幾天介紹過的分散式計算框架 Ray,例如以下的程式碼示範:

from metaflow import FlowSpec, step
import ray
from ray import train
import ray.train.torch
import torch
import torch.nn as nn

class CombinedRayFlow(FlowSpec):

    @step
    def start(self):
        ray.init()
        ### 使用 Ray Data 建立資料集 ###
        self.dataset = ray.data.range(10000).map(lambda x: {"feature": x, "label": x * 2})
        self.next(self.prepare_data)

    @step
    def prepare_data(self):
        ### 準備訓練和測試資料集 ###
        self.train_dataset = self.dataset.take(8000)
        self.test_dataset = self.dataset.skip(8000)
        self.next(self.train_model)

    @step
    def train_model(self):
    
        ### Ray Train 的 training function ###
        def train_func(config):
            model = nn.Linear(1, 1)
            optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
            train_data = train.get_dataset_shard("train")
            for epoch in range(10):
                for batch in train_data.iter_torch_batches(batch_size=32):
                    optimizer.zero_grad()
                    output = model(batch["feature"].float().unsqueeze(1))
                    loss = nn.MSELoss()(output, batch["label"].float().unsqueeze(1))
                    loss.backward()
                    optimizer.step()
            return {"loss": loss.item()}

        trainer = ray.train.torch.TorchTrainer(
            train_func,
            scaling_config=ray.train.ScalingConfig(num_workers=2),
            datasets={
                "train": ray.train.torch.TorchDatasetsConfig(self.train_dataset)
            }
        )
        result = trainer.fit()
        self.final_loss = result.metrics["loss"][-1]
        print(f"Final loss: {self.final_loss}")
        self.next(self.end)

    @step
    def end(self):
        print("Finished the flow!")
        ray.shutdown()

if __name__ == "__main__":
    CombinedRayFlow()

套件管理

為了確保套件的版本一致性,Metaflow 提供很方便的方式指定套件版本,只要在 flow 的前面加上 @conda_base,在裡面設定好套件版本內容,並在執行時指定 environment,例如 python playlist.py --environment=conda run 即可。

@conda_base(python="3.10.*",
    libraries={
        "datashader": "0.14.0",
        "pandas": "1.4.2",
        "pyarrow": "5.0.0",
        "numpy": "1.23.3",
    })
class Flow(FlowSpec):
    ...

Cloud Resources

除了在 local 計算以外,Metaflow 也支援多種 cloud platform,包括 AWS、Azure 和 Google Cloud,支援的功能如下:

  • AWS Batch 用於計算,以及 AWS Step Functions 用於 orchestrating workflows。
  • AWS with Kubernetes
  • Azure with Kubernetes
  • Google Cloud with Kubernetes

使用 cloud resources 的好處很多,包括可以直接使用他們提供的 infrastructure,我們不需要特別管理;也可以根據需求以自動調整資源,具有可擴展性。

至於要如何設定 cloud resources 的部署方式可以參考 [1],或是 Metaflow 也有提供 sandbox 讓我們玩玩看。

使用 AWS Cloud Resources

若我們把 AWS 的 config 都設定好之後,可以在 local 環境中,完全不改任何一行 code,就可以使用 AWS 的 Kubernetes。

我們只要執行以下這行指令即可:

python flow.py --environment=conda run -with kubernetes

在執行時,所有的 artifacts 都會存在 remote datastore 上,例如 AWS S3 上,不過在 local 也可以 用前面介紹過的方式直接查看內容,非常方便。

另外,可以使用 @batch(cpu=2, memory=2048) 設定要使用的 AWS batch 資源,以及加上 @retry,若有任何問題,Metaflow 會自動一直 retry。

簡單的程式碼示範如下:

from metaflow import FlowSpec, step, batch

class CloudFlow(FlowSpec):

    @step
    def start(self):
        self.next(self.cloud_step)
        
    @retry
    @batch(cpu=2, memory=2048)   # 這個步驟將在AWS Batch上運行
    @step
    def cloud_step(self):
        import time
        time.sleep(10)
        self.next(self.end)

    @step
    def end(self):
        print("Finish!")

if __name__ == '__main__':
    CloudFlow()

以上就結束這三天份的 Metaflow 介紹啦!
Metaflow 官方的 tutorials 還有很多詳盡的用法,例如如何訓練 NLP、CV 和推薦系統的模型,以及部署的細節和參數設定,推薦大家都去玩玩看。
https://docs.outerbounds.com/tutorials-index/


Reference:
[1] https://docs.outerbounds.com/engineering/welcome/


上一篇
[Day 23] Metaflow - Part 2. Parameters、版本紀錄、Metaflow UI、Card
下一篇
[Day 25] Uber 慘痛的資料損失經驗,以及他們的解法——D3 監控系統
系列文
從點子構想到部署上線:機器學習專案的一生30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言